from ma.commons.core.newjobaction import NewJobAction
from ma.commons.core.newmaptaskaction import NewMapTaskAction
from ma.commons.core.newreducetaskaction import NewReduceTaskAction
from ma.commons.core.constants import * 
from ma.commons.core.nodetrackerstatus import *
from ma.commons.core.completejobsaction import CompleteJobsAction
import ma.log
import ma.const
from ma.fs.dfs.dfsflags import *
from ma.fs.dfs._hdfs import HDFS
from .taskscheduler import TaskScheduler
from .nodetracker import *

import logging
import logging.config
from threading import Thread
import pickle
import sys
import os
import time


class Master(object):
	"""this class is the master node that will be 
	1. tracking MR jobs and 
	2. tracking MR tasks on the various NTs, 
	3. also monitor NTs' health 
	4. report progress on MR jobs to the user  
	5 will interact with the HDFS on the NT's behalf
	"""
	
	
	def __init__(self, forced_job_id=None):		
		"""initiates the Master and all its data structures
		"""
		
		#initializing the logger
		self.__log = ma.log.get_logger('ma.master')
		
		self.newNTId = 0
		
		# layer that speaks to the HDFS
		host = ma.const.XmlData.get_str_data(ma.const.xml_hdfs_host)
		port = ma.const.XmlData.get_int_data(ma.const.xml_hdfs_port)
		self.hdfs  = HDFS(host, port)
		
		self.dfsflags = DfsFlags(self.hdfs)
		if forced_job_id != None:
			self.__log.warning('Forcing job id: %d', forced_job_id)
			self.dfsflags.force_job_id(forced_job_id)
				
		# this dict is job_id -> Job data struct
		self.maps_of_jobs = {} 
		self.reduces_of_jobs = {}
		
		list_of_jobs = self.dfsflags.get_active_job_list()
		
		self.threads = []
		
		self.jobs = {}
		#dict of recent complete jobs per NT since that Nt's last heartbeat
		#NT_id -> [] of recent completed tasks 
		self.complete_jobs = {}
		
		# holds info on active jobs currently whether its map phase has completed or not  
		for job in list_of_jobs:
			self.jobs[job] = [False]
			self.localizeJob(job)
		
		# creates all the map and reduce stubs before the job can begin	
		self.createMapStubs(list_of_jobs)
		self.createReduceStubs(list_of_jobs)
		
		# this is the task scheduler that suggests what job to be picked up next
		self.task_scheduler = TaskScheduler(self.jobs)#self.dfsflags.get_active_job_list())
				
		# this dict is task_id -> TIP
		self.running_tasks = {}
		
		# this dict is task_id -> TIP
		self.failed_tasks = {}
		
		# this dict is task_id -> TIP
		self.completed_tasks = {}
				
		#tasks running on which NTs. task_id -> NT_id
		self.task_to_NT = {}
		
		#NT_id -> NodeTrackerStatus
		self.live_NTs = {}
		
		self.check = 0
		
		#this is a dict that will hold : NT_id -> [list of map completed since last heartbeat; in tuples (job_id, task_id)] (cleared at every heartbeat call)
		self.completedMapstoNTDict = {}
		
		self.temp_job_complete = False
		
		# TODO: REMOVE temp output writing to file
		# temp file creation for keeping track of the final reduces
		fd = open('job_complete.out', 'w+')
		fd.close()
		
		
	def createMapStubs(self, list_of_jobs):
		
		for job in list_of_jobs:
			maps, maps_complete = self.dfsflags.choose_map_tasks(20000, job)
			#Returns list of tuples (map_id, struct_id, [inputs (filename,offset, size)])
			self.maps_of_jobs[job] = []
			for i in range (0,len(maps)):
				self.maps_of_jobs[job].append([0,maps[i],""])

	
	def createReduceStubs(self, list_of_jobs):

		for job in list_of_jobs:
			no_of_reduces = ma.const.JobsXmlData.get_int_data(ma.const.xml_no_reduces, job)
			input_data = []
			for task_data in self.maps_of_jobs[job]:
				input_data.append((job,MAP,task_data[1][0]))
			self.reduces_of_jobs[job] =[]
			for i in range(0,no_of_reduces):
				self.reduces_of_jobs[job].append([0,(i,input_data)])


	def run(self):		
		"""this func runs forever it starts before any other thread begins
		"""
	
		
	def retrieveTaskJobId(self, task_id):
		"""this func returns the job_id, task_id and attempts at a task
		from the task_id string in which they are embedded as a hyphenated string
		"""
		
		list = task_id.split(TASK_ID_SEP)
		
		ids = []
		
		for i in range (0,len(list)):
			ids.append(int(i))
		
		#returns as int-s job_id, Task_id and map_or_reduce
		
		return int(list[0]), int(list[1]), list[2]
	
		
	def constructTaskId(self, job_id, task_id, m_or_r):
		"""this func returns a single string which can be deciphered to the
		job_id, task_id and the task type using the retrieveTaskJobId()
		function
		"""
		
		# returns the task_id string
		return str(job_id) + TASK_ID_SEP + str(task_id) + TASK_ID_SEP + str(m_or_r)
	
	
	def getActiveJobs(self):
		"""will get a list of active jobs from the hdfs and update its jobs dict
		"""
		
		print("At getActiveJobs: jobs in queue" , self.jobs)
		
		new_jobs = []
		list_of_jobs = self.dfsflags.get_active_job_list()
		
		#filling in active jobs list from the HDFS
		for i in list_of_jobs:
			if i not in self.jobs:
				self.jobs[i] = [False]
				new_jobs.append(i)
				self.localizeJob(i)
		
		self.createMapStubs(new_jobs)
		self.createReduceStubs(new_jobs)
		
		inactive_jobs = []
		#removing inactive jobs from the master
		for j in self.jobs:
			if j not in list_of_jobs:
				inactive_jobs.append(j)
				del self.maps_of_jobs[j]
				del self.reduces_of_jobs[j]
				self.complete_jobs.append(j)
	
		for i in inactive_jobs:
			del self.jobs[i]
			
		print("At getActiveJobs: after refreshing jobs in queue" , self.jobs)
		
		
	def mark_task_complete(self, task_id, nodetracker_ip="0.0.0.0"):
		"""mark task complete
		"""
		
		jobid, taskid, map_or_reduce = self.retrieveTaskJobId(task_id)
		
		if task_id in self.running_tasks:
			self.completed_tasks[task_id] = self.running_tasks.pop(task_id)
		else:
			return False
		
		if map_or_reduce == MAP:
			self.maps_of_jobs[jobid][taskid][0] = 2
			self.maps_of_jobs[jobid][taskid][2] = nodetracker_ip
			
			self.__log.info("Mark MAP complete called for %s job-id %s", str(task_id), str(jobid))
			for tracker_id in self.completedMapstoNTDict:
				self.completedMapstoNTDict[tracker_id].append((taskid, jobid, nodetracker_ip))
			
			# check if all maps have been picked up	
			if jobid in self.maps_of_jobs:
				all_maps_picked_up = True
				for index in range(0,len(self.maps_of_jobs[jobid])):
					if(self.maps_of_jobs[jobid][index][0] == 0):
						all_maps_picked_up = False
						break
				if all_maps_picked_up == True:
					self.markAllMapsPickedUp(jobid)
			return True	
		else:
			reduces_not_complete = False
			self.reduces_of_jobs[jobid][taskid][0] = 2
			self.__log.info("Mark REDUCE complete called for %s job-id %s", str(task_id), str(jobid))
			for index in range(0,len(self.reduces_of_jobs[jobid])):
				if(self.reduces_of_jobs[jobid][index][0] < 2):
					reduces_not_complete = True
					break
			if reduces_not_complete == False:
				self.markJobComplete(jobid)
			
			return True			
		return False

		
	def mark_task_failed(self, task_id):		
		"""mark task complete
		"""
	
		if task_id in self.running_tasks:
			self.failed_tasks[task_id] = self.running_tasks.pop(task_id)
			return True
		return False
	
	
	def choose_map_tasks(self, tasks_to_run, job_id, tracker_id):		
		"""returns (map_id, [inputs (filename,offset, size)])
		"""
		
		list_of_maps = []
		
		if job_id in self.maps_of_jobs:
		
			for index in range(0,len(self.maps_of_jobs[job_id])): 
				
				if self.maps_of_jobs[job_id][index][0] == 0:
					if tasks_to_run <= 0:
						break
					list_of_maps.append(self.maps_of_jobs[job_id][index][1])
					self.maps_of_jobs[job_id][index][0] = 1
					tasks_to_run -= 1
					
					nodetracker_ip = self.live_NTs[tracker_id].nodetracker_ip
					self.__log.info("Assigned Map task %s with job_id %s to NT %s (%s)", str(index), str(job_id), str(tracker_id), nodetracker_ip)
					
					task_id = self.constructTaskId(job_id, index, MAP)
					self.running_tasks[task_id] = "momina"
					
		return list_of_maps 
		
		
	def choose_reduce_tasks(self, tasks_to_run, job_id, tracker_id):
		"""
		"""
		
		list_of_reduces = []
		if job_id in self.reduces_of_jobs:
			for index in range(0,len(self.reduces_of_jobs[job_id])):
				if self.reduces_of_jobs[job_id][index][0] == 0:
					if tasks_to_run <= 0:
						break
					list_of_reduces.append(self.reduces_of_jobs[job_id][index][1])
					self.reduces_of_jobs[job_id][index][0] = 1
					tasks_to_run -= 1
					
					nodetracker_ip = self.live_NTs[tracker_id].nodetracker_ip
					self.__log.info("Assigned Reduce task %s with job_id %s to NT %s (%s)", str(index), str(job_id), str(tracker_id), nodetracker_ip)
							
					task_id = self.constructTaskId(job_id, index, REDUCE)
					self.running_tasks[task_id] = "ahmad"
		
		return list_of_reduces
		
	
	def assignNewTasksToNT(self, tasks_to_run, actions, tracker_id):
		"""this func will assign new tasks to the NT
		
		the following bit of code will get 
		1 a) preference list of tasks from the TaskScheduler
		2) try and book these tasks for this particular NT through DFSFlags
		3) make ReduceTip and MapTip of these tasks 
		   objects to be passed to the NT to run as action objects!
		"""
		
		if(tasks_to_run == 0):
			return
		
		#get job_id and a list of suggested map or reduce tasks from the TasksScheduler
		#for that job
		job_id, map_or_red = self.task_scheduler.scheduleTasks(self.jobs)
		
		nodetracker_ip = self.live_NTs[tracker_id].nodetracker_ip
		self.__log.info("Scheduling %s tasks of job-id %s for NT %s (%s)", map_or_red, str(job_id), str(tracker_id), nodetracker_ip)
		
		if job_id == -1:
		#means no job in job list
			return
		
		if self.jobs[job_id][0] == False and map_or_red == MAP:
			list_of_tasks_assigned = self.choose_map_tasks(tasks_to_run, job_id, tracker_id)
			
			self.__log.info("%d Maps scheduled for NT %s (%s)", len(list_of_tasks_assigned), str(tracker_id), nodetracker_ip)
			if list_of_tasks_assigned is None:
				return
			
			#task info tuple conatins (map_id, struct_id, [inputs (filename,offset, size)])
			for task_info in list_of_tasks_assigned:
				
				#GET MAP TASK SPECIFIC INFO FROM ITS XML
				map_task = NewMapTaskAction(task_info[0], job_id, task_info[2], task_info[1])
				#ADD TO ACTIONS LIST
				actions.append(map_task)
			
				#self.__log.info("map task_id %s assigned by the HDFS of job_id %d", str(task_info[0]), job_id)
				task_id = self.constructTaskId(job_id, task_info[0], MAP)
				self.running_tasks[task_id] = map_task

		else: #if task is a reduce
			list_of_tasks_assigned = self.choose_reduce_tasks(tasks_to_run, job_id, tracker_id)
			
			self.__log.info("%d Reduces scheduled for NT %s (%s)", len(list_of_tasks_assigned), str(tracker_id), nodetracker_ip)
			if list_of_tasks_assigned is None:
				return

			for task_info in list_of_tasks_assigned:
				#GET MAP TASK SPECIFIC INFO FROM ITS XML
				reduce_task = NewReduceTaskAction( job_id,task_info[0], task_info[1])
				#ADD TO ACTIONS LIST
				actions.append(reduce_task)

				#self.__log.info("reduce task_id assigned by the master %s and job_id %d", str(task_info[0]), job_id)
				task_id = self.constructTaskId(job_id, task_info[0], REDUCE)
				self.running_tasks[task_id] = reduce_task

				
	def heartbeatFromNT(self, taskinfo_string, tasks_to_run, tracker_id):
		"""this func will be periodically called from every NT to pass task info 
		objects to the master so that it can update its status info of tasks
		running on this NT onto its task dicts
		this function will return an actions list of objects dictating what actions 
		the NT is to perform till the next heartbeat  
		"""
		
		taskinfo_list = pickle.loads(taskinfo_string)
		
		#list of actions to be returned
		actions = []
		
		# get the ip of the nodetracker whose heartbeat is coming
		nodetracker_ip = self.live_NTs[tracker_id].nodetracker_ip
		
		#assigning tasks to the NT according to the number of fresh tasks it can run
		self.assignNewTasksToNT(tasks_to_run, actions, tracker_id)
		
		#placing complete and failed tasks info from the NT onto the hDFS
		for key in range (0,taskinfo_list.__len__()):
			i = taskinfo_list.__getitem__(key)
			task_id_tag = self.constructTaskId(i.job_id, i.task_id, i.map_or_reduce)
			
			if i.type == COMPLETEDTASK_INFO_TYPE:
				#mark task as complete on the master
				self.mark_task_complete(task_id_tag, str(nodetracker_ip))
				self.__log.debug("A %s task with job_id %s and task_id %s reported complete by NT %s (%s)", str(i.map_or_reduce), str(i.job_id), str(i.task_id), str(tracker_id), nodetracker_ip)
				
			if i.type == FAILEDTASK_INFO_TYPE:
				#mark task as failed on the master
				self.mark_task_failed(task_id_tag)
				self.__log.error("A %s task with job_id %s and task_id %s reported failed by NT %s (%s)", str(i.map_or_reduce), str(i.job_id), str(i.task_id), str(tracker_id), nodetracker_ip)
				
		recent_maps_completed_list = self.getRecentDoneMapsForNT(tracker_id)
		
		# get recent completed jobs for NT
		recent_jobs_completed = self.complete_jobs[tracker_id]
		self.complete_jobs[tracker_id] = []
		
		if self.temp_job_complete:
			print("###########################################################")
			print("========JOB COMPLETED ======================================")
			print("#############################################################")
			
			# TODO: REMOVE temp output writing to file
			# temp file creation for keeping track of the final reduces
			fd = open('job_complete.out', 'a')
			fd.write('Job complete, ' + str(time.time()) + '\n')
			fd.close()
		
		actions_str = pickle.dumps(actions) 
		return actions_str, recent_maps_completed_list, recent_jobs_completed 
	
		
	def getRecentDoneMapsForNT(self, tracker_id):
		"""
		"""
		
		if tracker_id in self.completedMapstoNTDict:
			list = self.completedMapstoNTDict[tracker_id]
			self.completedMapstoNTDict[tracker_id] = []
			return list
		else: 
			return []
		
		
	def registerNT(self, nodetracker_ip):
		"""this method will register an NT with the master as soon as it comes up
		"""
		
		# get the next tracker-id and increment id
		tracker_id = self.newNTId
		self.newNTId +=1 
		
		tracker_status = NodeTrackerStatus(tracker_id, nodetracker_ip)
		self.live_NTs[tracker_id] = tracker_status
		
		#update NT entry in self.completedMapstoNTDict used to send completed maps since last heartbeat
		self.completedMapstoNTDict[tracker_id] = []
		
		self.complete_jobs[tracker_id] = []
		
		self.__log.info("New tracker %s (%s) reporting for duty", str(tracker_id), nodetracker_ip)
		
		return tracker_id
	
	
	def removeNT(self, tracker_id):	
		""" this function will remove an NT from the live NTs dict if the former 
		has expired
		"""
	
	def markAllMapsPickedUp(self,job_id):	
		"""marks maps complete at the master
		"""
		
		self.jobs[job_id] = [True]
	
	
	def markJobComplete(self,job_id):
		"""marks jobs complete at the master
		"""
		
		for nt_id in self.complete_jobs:
			self.complete_jobs[nt_id].append(job_id)
		print("###########################################################")
		print("========JOB COMPLETED ======================================")
		print("#############################################################")
		self.temp_job_complete = True
	
	
	def localizeJob(self, job_id):
		"""Run for copying all essential files from the DFS to the local
		directory whenever this NT gets a job whose task it hasn't done 
		before. It also creates the relevant directories in the local FS
		"""
		
		try:
			self.__log.info("Initiating / copying config files for Job id %d", job_id)
			
			# get consts
			input_path = ma.const.JobsXmlData.get_filepath_str_data(ma.const.xml_local_input_temp_dir, job_id)
			output_path = ma.const.JobsXmlData.get_filepath_str_data(ma.const.xml_local_output_temp_dir, job_id)
			compute_path = ma.const.JobsXmlData.get_filepath_str_data(ma.const.xml_local_compute_temp_dir, job_id)
			
			# create directories
			if not os.path.isdir(input_path):
				os.makedirs(input_path)
			if not os.path.isdir(output_path):
				os.makedirs(output_path)
			if not os.path.isdir(compute_path):
				os.makedirs(compute_path)
			
			# copy job conf
			job_conf_local_dir = os.path.dirname(ma.const.JobsXmlData.get_filepath_str_data(ma.const.xml_local_input_temp_dir, job_id))
			job_conf_dfs_filepath = ma.const.JobsXmlData.get_dfs_filepath_str_data(ma.const.xml_dfs_path_job_conf, job_id)
			
			if self.hdfs.check_path(job_conf_dfs_filepath) == 1:
				# delete if already exists
				local_jobconf_filepath = job_conf_local_dir + os.sep + os.path.basename(job_conf_dfs_filepath)
				if os.path.isfile(local_jobconf_filepath):
					os.remove(local_jobconf_filepath)
				
				self.hdfs.copy_file_to_local(job_conf_dfs_filepath, job_conf_local_dir, job_id, auto_retry=True)
			else:
				self.__log.error('Job Conf for job_id %d does not exist', job_id)
			
			# reinitialize the jobs XML data to remove all previous values
			ma.const.JobsXmlData.reinitialize(job_id)
			
		except Exception as err_msg:
			self.__log.error('Error while localizing new job: %s', err_msg) 


def main(argv=None):
	
	from xmlrpc.server import SimpleXMLRPCServer
	
	master_ip = ma.const.XmlData.get_str_data(ma.const.xml_master_ip)
	master_port = ma.const.XmlData.get_int_data(ma.const.xml_master_port)
	# Create server
	server = SimpleXMLRPCServer((master_ip, master_port))
	server.register_introspection_functions()
	
	if len(sys.argv) == 2:
		forced_job_id = int(sys.argv[1])
		print('--> Forced job id', forced_job_id)
		server.register_instance(Master(forced_job_id))
	else:
		server.register_instance(Master())
	
	

	# Run the server's main loop
	server.serve_forever()
	
	'''
	master = Master()
	#master.registerNT()
	master.getActiveJobs()
	print master.choose_reduce_tasks(3, 1)
	print master.reduces_of_jobs
	id = master.registerNT()
	task_id = str(2)+'_'+str(1)+'_R'
	master.running_tasks[task_id] = NewMapTaskAction(2,1,2)
	master.mark_task_complete(task_id, id)
	print master.completed_tasks
	print master.reduces_of_jobs
		map_task = NewMapTaskAction(1, 0, MAP)
	master.running_tasks['0_1_M'] = map_task
	reduce_task = NewReduceTaskAction(1, 0,REDUCE)
	master.running_tasks['0_1_R'] = reduce_task
	
	master.heartbeatFromNT('momina', 2, 0, True)   
	'''		
if __name__ == "__main__":

	"""The program's entry point."""
	sys.exit(main())	
		

	